Source code for geonode.security.tests

#########################################################################
#
# Copyright (C) 2016 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################

import json
import base64
import logging
import requests
import importlib
import mock

from requests.auth import HTTPBasicAuth
from tastypie.test import ResourceTestCaseMixin

from django.db.models import Q
from django.urls import reverse
from django.conf import settings
from django.http import HttpRequest
from django.test.testcases import TestCase
from django.contrib.auth import get_user_model
from django.test.utils import override_settings
from django.contrib.auth.models import AnonymousUser

from guardian.shortcuts import assign_perm, get_anonymous_user

from geonode import geoserver
from geonode.geoserver.helpers import geofence, gf_utils
from geonode.maps.models import Map
from geonode.layers.models import Dataset
from geonode.documents.models import Document
from geonode.compat import ensure_string
from geonode.utils import check_ogc_backend
from geonode.tests.utils import check_dataset
from geonode.decorators import on_ogc_backend
from geonode.resource.manager import resource_manager
from geonode.tests.base import GeoNodeBaseTestSupport
from geonode.groups.models import Group, GroupMember, GroupProfile
from geonode.layers.populate_datasets_data import create_dataset_data
from geonode.base.auth import create_auth_token, get_or_create_token

from geonode.base.models import Configuration, UserGeoLimit, GroupGeoLimit
from geonode.base.populate_test_data import (
    all_public,
    create_models,
    create_single_doc,
    create_single_map,
    remove_models,
    create_single_dataset,
)
from geonode.geoserver.security import (
    _get_gf_services,
    allow_layer_to_all,
    delete_all_acl_rules,
    sync_resources_with_guardian,
    _get_gwc_filters_and_formats,
    has_geolimits,
    create_geofence_rules,
    delete_acl_rules_for_layer,
)


from .utils import (
    get_users_with_perms,
    get_visible_resources,
)

from .permissions import PermSpec, PermSpecCompact

[docs] logger = logging.getLogger(__name__)
[docs] def _log(msg, *args): logger.debug(msg, *args)
[docs] class StreamToLogger: """ Fake file-like stream object that redirects writes to a logger instance. """ def __init__(self, logger, log_level=logging.INFO):
[docs] self.logger = logger
[docs] self.log_level = log_level
[docs] self.linebuf = ""
[docs] def write(self, buf): for line in buf.rstrip().splitlines(): self.logger.log(self.log_level, line.rstrip())
[docs] class SecurityTests(ResourceTestCaseMixin, GeoNodeBaseTestSupport): """ Tests for the Geonode security app. """ @classmethod
[docs] def setUpClass(cls): super().setUpClass() create_models(type=cls.get_type, integration=cls.get_integration) all_public()
@classmethod
[docs] def tearDownClass(cls): super().tearDownClass() remove_models(cls.get_obj_ids, type=cls.get_type, integration=cls.get_integration)
[docs] def setUp(self): super().setUp() if check_ogc_backend(geoserver.BACKEND_PACKAGE): settings.OGC_SERVER["default"]["ACL_SECURITY_ENABLED"] = True self.maxDiff = None self.user = "admin" self.passwd = "admin" create_dataset_data() self.anonymous_user = get_anonymous_user() self.config = Configuration.load() self.list_url = reverse("api_dispatch_list", kwargs={"api_name": "api", "resource_name": "datasets"}) self.bulk_perms_url = reverse("bulk_permissions") self.perm_spec = {"users": {"admin": ["view_resourcebase"]}, "groups": []}
@on_ogc_backend(geoserver.BACKEND_PACKAGE)
[docs] def test_login_middleware(self): """ Tests the Geonode login required authentication middleware. """ from geonode.security.middleware import LoginRequiredMiddleware middleware = LoginRequiredMiddleware(None) white_list = [ reverse("account_ajax_login"), reverse("account_confirm_email", kwargs=dict(key="test")), reverse("account_login"), reverse("account_reset_password"), reverse("forgot_username"), reverse("dataset_acls"), reverse("dataset_resolve_user"), ] black_list = [ reverse("account_signup"), reverse("profile_browse"), ] request = HttpRequest() request.user = get_anonymous_user() # Requests should be redirected to the the `redirected_to` path when un-authenticated user attempts to visit # a black-listed url. for path in black_list: request.path = path response = middleware.process_request(request) if response: self.assertEqual(response.status_code, 302) self.assertTrue(response.get("Location").startswith(middleware.redirect_to)) # The middleware should return None when an un-authenticated user # attempts to visit a white-listed url. for path in white_list: request.path = path response = middleware.process_request(request) self.assertIsNone(response, msg=f"Middleware activated for white listed path: {path}") self.client.login(username="admin", password="admin") admin = get_user_model().objects.get(username="admin") self.assertTrue(admin.is_authenticated) request.user = admin # The middleware should return None when an authenticated user attempts # to visit a black-listed url. for path in black_list: request.path = path response = middleware.process_request(request) self.assertIsNone(response)
@on_ogc_backend(geoserver.BACKEND_PACKAGE)
[docs] def test_login_middleware_with_basic_auth(self): """ Tests the Geonode login required authentication middleware with Basic authenticated queries """ from geonode.security.middleware import LoginRequiredMiddleware middleware = LoginRequiredMiddleware(None) black_listed_url = reverse("dataset_upload") white_listed_url = reverse("account_login") # unauthorized request to black listed URL should be redirected to `redirect_to` URL request = HttpRequest() request.user = get_anonymous_user() request.path = black_listed_url response = middleware.process_request(request) if response: self.assertEqual(response.status_code, 302) self.assertTrue(response.get("Location").startswith(middleware.redirect_to)) # unauthorized request to white listed URL should be allowed request.path = white_listed_url response = middleware.process_request(request) self.assertIsNone(response, msg=f"Middleware activated for white listed path: {black_listed_url}") # Basic authorized request to black listed URL should be allowed request.path = black_listed_url request.META["HTTP_AUTHORIZATION"] = f'Basic {base64.b64encode(b"bobby:bob").decode("utf-8")}' response = middleware.process_request(request) self.assertIsNone(response, msg=f"Middleware activated for white listed path: {black_listed_url}")
@on_ogc_backend(geoserver.BACKEND_PACKAGE)
[docs] def test_login_middleware_with_custom_login_url(self): """ Tests the Geonode login required authentication middleware with Basic authenticated queries """ site_url_settings = [f"{settings.SITEURL}login/custom", "/login/custom", "login/custom"] black_listed_url = reverse("dataset_upload") for setting in site_url_settings: with override_settings(LOGIN_URL=setting): from geonode.security import middleware as mw # reload the middleware module to fetch overridden settings importlib.reload(mw) middleware = mw.LoginRequiredMiddleware(None) # unauthorized request to black listed URL should be redirected to `redirect_to` URL request = HttpRequest() request.user = AnonymousUser() request.path = black_listed_url response = middleware.process_request(request) self.assertIsNotNone(response, "Middleware didn't activate for blacklisted URL.") self.assertEqual(response.status_code, 302) self.assertTrue( response.get("Location").startswith("/"), msg=f"Returned redirection should be a valid path starting '/'. " f"Instead got: {response.get('Location')}", )
@on_ogc_backend(geoserver.BACKEND_PACKAGE)
[docs] def test_session_ctrl_middleware(self): """ Tests the Geonode session control authentication middleware. """ from geonode.security.middleware import SessionControlMiddleware from importlib import import_module engine = import_module(settings.SESSION_ENGINE) middleware = SessionControlMiddleware(None) admin = get_user_model().objects.filter(is_superuser=True).first() request = HttpRequest() request.user = admin request.session = engine.SessionStore() request.session["access_token"] = get_or_create_token(admin) request.session.save() middleware.process_request(request) self.assertFalse(request.session.is_empty()) request.session["access_token"] = None request.session.save() middleware.process_request(request) self.assertTrue(request.session.is_empty()) # Test the full cycle through the client path = reverse("account_email") self.client.login(username="admin", password="admin") response = self.client.get(path) self.assertEqual(response.status_code, 200) # Simulating Token expired (or not set) session_id = self.client.cookies.get(settings.SESSION_COOKIE_NAME) session = engine.SessionStore(session_id.value) session["access_token"] = None session.save() response = self.client.get(path) self.assertEqual(response.status_code, 302)
@on_ogc_backend(geoserver.BACKEND_PACKAGE)
[docs] def test_attributes_sats_refresh(self): layers = Dataset.objects.all()[:2].values_list("id", flat=True) test_dataset = Dataset.objects.get(id=layers[0]) self.client.login(username="admin", password="admin") dataset_attributes = test_dataset.attributes self.assertIsNotNone(dataset_attributes) test_dataset.attribute_set.all().delete() test_dataset.save() data = {"uuid": test_dataset.uuid} resp = self.client.post(reverse("attributes_sats_refresh"), data) if resp.status_code == 200: self.assertHttpOK(resp) self.assertEqual(dataset_attributes.count(), test_dataset.attributes.count()) from geonode.geoserver.helpers import set_attributes_from_geoserver test_dataset.attribute_set.all().delete() test_dataset.save() set_attributes_from_geoserver(test_dataset, overwrite=True) self.assertEqual(dataset_attributes.count(), test_dataset.attributes.count()) # Remove permissions to anonymous users and try to refresh attributes again test_dataset.set_permissions({"users": {"AnonymousUser": []}, "groups": []}) test_dataset.attribute_set.all().delete() test_dataset.save() set_attributes_from_geoserver(test_dataset, overwrite=True) self.assertEqual(dataset_attributes.count(), test_dataset.attributes.count()) else: # If GeoServer is unreachable, this view now returns a 302 error self.assertEqual(resp.status_code, 302)
@on_ogc_backend(geoserver.BACKEND_PACKAGE)
[docs] def test_invalidate_tileddataset_cache(self): layers = Dataset.objects.all()[:2].values_list("id", flat=True) test_dataset = Dataset.objects.get(id=layers[0]) self.client.login(username="admin", password="admin") data = {"uuid": test_dataset.uuid} resp = self.client.post(reverse("invalidate_tileddataset_cache"), data) self.assertHttpOK(resp)
[docs] def test_set_bulk_permissions(self): """Test that after restrict view permissions on two layers bobby is unable to see them""" rules_count = 0 if check_ogc_backend(geoserver.BACKEND_PACKAGE): delete_all_acl_rules() # Reset GeoFence Rules rules_count = geofence.get_rules_count() self.assertEqual(rules_count, 0) layers = Dataset.objects.all()[:2].values_list("id", flat=True) layers_id = [str(x) for x in layers] test_perm_dataset = Dataset.objects.get(id=layers[0]) self.client.login(username="admin", password="admin") resp = self.client.get(self.list_url) self.assertEqual(len(self.deserialize(resp)["objects"]), 8) data = {"permissions": json.dumps(self.perm_spec), "resources": layers_id} resp = self.client.post(self.bulk_perms_url, data) self.assertHttpOK(resp) if check_ogc_backend(geoserver.BACKEND_PACKAGE): # Check GeoFence Rules have been correctly created rules_count = geofence.get_rules_count() _log(f"1. rules_count: {rules_count} ") self.assertGreaterEqual(rules_count, 10) allow_layer_to_all(test_perm_dataset) rules_count = geofence.get_rules_count() _log(f"2. rules_count: {rules_count} ") self.assertGreaterEqual(rules_count, 11) self.client.logout() if check_ogc_backend(geoserver.BACKEND_PACKAGE): self.client.login(username="bobby", password="bob") resp = self.client.get(self.list_url) self.assertGreaterEqual(len(self.deserialize(resp)["objects"]), 6) # perms = get_users_with_perms(test_perm_dataset) # _log(f"3. perms: {perms} ") # batch = AutoPriorityBatch(get_first_available_priority(), f'test batch for {test_perm_dataset}') # for u, p in perms.items(): # create_geofence_rules(test_perm_dataset, p, user=u, batch=batch) # geofence.run_batch(batch) # Check GeoFence Rules have been correctly created rules_count = geofence.get_rules_count() _log(f"4. rules_count: {rules_count} ") self.assertGreaterEqual(rules_count, 13) # Validate maximum priority available_priority = gf_utils.get_first_available_priority() _log(f"5. available_priority: {available_priority} ") self.assertTrue(available_priority > 0) url = settings.OGC_SERVER["default"]["LOCATION"] user = settings.OGC_SERVER["default"]["USER"] passwd = settings.OGC_SERVER["default"]["PASSWORD"] test_url = f"{url}gwc/rest/seed/{test_perm_dataset.alternate}.json" r = requests.get(test_url, auth=HTTPBasicAuth(user, passwd)) self.assertEqual(r.status_code, 400, f"GWC error for user: {user} URL: {test_url}\n{r.text}") rules_count = 0 if check_ogc_backend(geoserver.BACKEND_PACKAGE): delete_all_acl_rules() # Reset GeoFence Rules rules_count = geofence.get_rules_count() self.assertEqual(rules_count, 0)
[docs] def test_bobby_cannot_set_all(self): """Test that Bobby can set the permissions only on the ones for which he has the right""" bobby = get_user_model().objects.get(username="bobby") layer = Dataset.objects.all().exclude(owner=bobby)[0] self.client.login(username="admin", password="admin") # give bobby the right to change the layer permissions assign_perm("change_resourcebase_permissions", bobby, layer.get_self_resource()) self.client.logout() self.client.login(username="bobby", password="bob") layer2 = Dataset.objects.all().exclude(owner=bobby)[1] data = { "permissions": json.dumps({"users": {"bobby": ["view_resourcebase"]}, "groups": []}), "resources": [layer.id, layer2.id], } resp = self.client.post(self.bulk_perms_url, data) content = resp.content if isinstance(content, bytes): content = content.decode("UTF-8") self.assertNotIn(layer.title, json.loads(content)["not_changed"]) self.assertIn(layer2.title, json.loads(content)["not_changed"])
@on_ogc_backend(geoserver.BACKEND_PACKAGE)
[docs] def test_user_can(self): bobby = get_user_model().objects.get(username="bobby") perm_spec = { "users": { "bobby": ["view_resourcebase", "download_resourcebase", "change_dataset_data", "change_dataset_style"] }, "groups": [], } dataset = Dataset.objects.filter(subtype="vector").first() dataset.set_permissions(perm_spec) # Test user has permission with read_only=False self.assertTrue(dataset.user_can(bobby, "change_dataset_style")) # Test with edit permission and read_only=True self.config.read_only = True self.config.save() self.assertFalse(dataset.user_can(bobby, "change_dataset_style")) # Test with view permission and read_only=True self.assertTrue(dataset.user_can(bobby, "view_resourcebase")) # Test on a 'raster' subtype self.config.read_only = False self.config.save() dataset = Dataset.objects.filter(subtype="raster").first() dataset.set_permissions(perm_spec) # Test user has permission with read_only=False self.assertFalse(dataset.user_can(bobby, "change_dataset_data")) self.assertTrue(dataset.user_can(bobby, "change_dataset_style"))
@on_ogc_backend(geoserver.BACKEND_PACKAGE)
[docs] def test_perm_specs_synchronization(self): """Test that Dataset is correctly synchronized with guardian: 1. Set permissions to all users 2. Set permissions to a single user 3. Set permissions to a group of users 4. Try to sync a layer from GeoServer """ bobby = get_user_model().objects.get(username="bobby") layer = Dataset.objects.filter(subtype="vector").exclude(owner=bobby).first() self.client.login(username="admin", password="admin") # Reset GeoFence Rules delete_all_acl_rules() self.assertEqual(geofence.get_rules_count(), 0) perm_spec = {"users": {"AnonymousUser": []}, "groups": []} layer.set_permissions(perm_spec) rules_count = geofence.get_rules_count() _log(f"1. rules_count: {rules_count} ") self.assertEqual(rules_count, 5) perm_spec = {"users": {"admin": ["view_resourcebase"]}, "groups": []} layer.set_permissions(perm_spec) rules_count = geofence.get_rules_count() _log(f"2. rules_count: {rules_count} ") self.assertEqual(rules_count, 9, f"Bad rules count. Got rules: {geofence.get_rules()}") perm_spec = {"users": {"admin": ["change_dataset_data"]}, "groups": []} layer.set_permissions(perm_spec) rules_count = geofence.get_rules_count() _log(f"3. rules_count: {rules_count} ") self.assertEqual(rules_count, 8, f"Bad rules count. Got rules: {geofence.get_rules()}") rules_objs = geofence.get_rules() wps_subfield_found = False for rule in rules_objs["rules"]: if rule["service"] == "WPS" and rule["subfield"] == "GS:DOWNLOAD": wps_subfield_found = rule["access"] == "DENY" break self.assertTrue(wps_subfield_found, f"WPS download not blocked. Got rules: {geofence.get_rules()}") # FULL WFS-T perm_spec = { "users": { "bobby": ["view_resourcebase", "download_resourcebase", "change_dataset_style", "change_dataset_data"] }, "groups": [], } layer.set_permissions(perm_spec) rules_count = geofence.get_rules_count() self.assertEqual(rules_count, 10) rules_objs = geofence.get_rules() _deny_wfst_rule_exists = False for rule in rules_objs["rules"]: if rule["service"] == "WFS" and rule["userName"] == "bobby" and rule["request"] == "TRANSACTION": _deny_wfst_rule_exists = rule["access"] == "DENY" break self.assertFalse(_deny_wfst_rule_exists) # NO WFS-T # - order is important perm_spec = { "users": { "bobby": [ "view_resourcebase", "download_resourcebase", ] }, "groups": [], } layer.set_permissions(perm_spec) rules_count = geofence.get_rules_count() self.assertEqual(rules_count, 13) rules_objs = geofence.get_rules() _deny_wfst_rule_exists = False _deny_wfst_rule_position = -1 _allow_wfs_rule_position = -1 for cnt, rule in enumerate(rules_objs["rules"]): if rule["service"] == "WFS" and rule["userName"] == "bobby" and rule["request"] == "TRANSACTION": _deny_wfst_rule_exists = rule["access"] == "DENY" _deny_wfst_rule_position = cnt elif ( rule["service"] == "WFS" and rule["userName"] == "bobby" and (rule["request"] is None or rule["request"] == "*") ): _allow_wfs_rule_position = cnt self.assertTrue(_deny_wfst_rule_exists) self.assertTrue(_allow_wfs_rule_position > _deny_wfst_rule_position) # NO WFS perm_spec = { "users": { "bobby": [ "view_resourcebase", ] }, "groups": [], } layer.set_permissions(perm_spec) rules_count = geofence.get_rules_count() self.assertEqual(rules_count, 9) rules_objs = geofence.get_rules() _deny_wfst_rule_exists = False for rule in rules_objs["rules"]: if rule["service"] == "WFS" and rule["userName"] == "bobby" and rule["request"] == "TRANSACTION": _deny_wfst_rule_exists = rule["access"] == "DENY" break self.assertFalse(_deny_wfst_rule_exists) perm_spec = {"users": {}, "groups": {"bar": ["view_resourcebase"]}} layer.set_permissions(perm_spec) rules_count = geofence.get_rules_count() _log(f"4. rules_count: {rules_count} ") self.assertEqual(rules_count, 9, f"Bad rule count, got rules {geofence.get_rules()}") perm_spec = {"users": {}, "groups": {"bar": ["change_resourcebase"]}} layer.set_permissions(perm_spec) rules_count = geofence.get_rules_count() _log(f"5. rules_count: {rules_count} ") self.assertEqual(rules_count, 5) # Testing GeoLimits # Reset GeoFence Rules delete_all_acl_rules() rules_count = geofence.get_rules_count() self.assertEqual(rules_count, 0) layer = Dataset.objects.first() # grab bobby bobby = get_user_model().objects.get(username="bobby") _disable_dataset_cache = has_geolimits(layer, None, None) filters, formats = _get_gwc_filters_and_formats(_disable_dataset_cache) self.assertListEqual(filters, [{"styleParameterFilter": {"STYLES": ""}}]) self.assertListEqual( formats, [ "application/json;type=utfgrid", "image/gif", "image/jpeg", "image/png", "image/png8", "image/vnd.jpeg-png", "image/vnd.jpeg-png8", ], ) geo_limit, _ = UserGeoLimit.objects.get_or_create(user=bobby, resource=layer.get_self_resource()) geo_limit.wkt = "SRID=4326;MULTIPOLYGON (((145.8046418749977 -42.49606500060302, \ 146.7000276171853 -42.53655428642583, 146.7110139453067 -43.07256577359489, \ 145.9804231249952 -43.05651288026286, 145.8046418749977 -42.49606500060302)))" geo_limit.save() layer.users_geolimits.add(geo_limit) self.assertEqual(layer.users_geolimits.all().count(), 1) _disable_dataset_cache = has_geolimits(layer, bobby, None) filters, formats = _get_gwc_filters_and_formats([_disable_dataset_cache]) self.assertIsNone(filters) self.assertIsNone(formats) perm_spec = {"users": {"bobby": ["view_resourcebase"]}, "groups": []} layer.set_permissions(perm_spec) rules_count = geofence.get_rules_count() self.assertEqual(rules_count, 10) rules_objs = geofence.get_rules() self.assertEqual(len(rules_objs["rules"]), 10) # Order is important _limit_rule_position = -1 for cnt, rule in enumerate(rules_objs["rules"]): if rule["service"] is None and rule["userName"] == "bobby": self.assertEqual(rule["userName"], "bobby") self.assertEqual(rule["workspace"], "geonode") self.assertEqual(rule["layer"], "CA") self.assertEqual(rule["access"], "LIMIT") self.assertTrue("limits" in rule) rule_limits = rule["limits"] self.assertEqual( rule_limits["allowedArea"], "SRID=4326;MULTIPOLYGON (((145.8046418749977 -42.49606500060302, \ 146.7000276171853 -42.53655428642583, 146.7110139453067 -43.07256577359489, \ 145.9804231249952 -43.05651288026286, 145.8046418749977 -42.49606500060302)))", ) self.assertEqual(rule_limits["catalogMode"], "MIXED") _limit_rule_position = cnt elif rule["userName"] == "bobby": # When there's a limit rule, "*" must be the first one self.assertTrue(_limit_rule_position < cnt) geo_limit, _ = GroupGeoLimit.objects.get_or_create( group=GroupProfile.objects.get(group__name="bar"), resource=layer.get_self_resource() ) geo_limit.wkt = "SRID=4326;MULTIPOLYGON (((145.8046418749977 -42.49606500060302, \ 146.7000276171853 -42.53655428642583, 146.7110139453067 -43.07256577359489, \ 145.9804231249952 -43.05651288026286, 145.8046418749977 -42.49606500060302)))" geo_limit.save() layer.groups_geolimits.add(geo_limit) self.assertEqual(layer.groups_geolimits.all().count(), 1) perm_spec = {"users": {}, "groups": {"bar": ["change_resourcebase"]}} layer.set_permissions(perm_spec) rules_count = geofence.get_rules_count() self.assertEqual(rules_count, 6) rules_objs = geofence.get_rules() self.assertEqual(len(rules_objs["rules"]), 6) # Order is important _limit_rule_position = -1 for cnt, rule in enumerate(rules_objs["rules"]): if rule["roleName"] == "ROLE_BAR": if rule["service"] is None: self.assertEqual(rule["userName"], None) self.assertEqual(rule["workspace"], "geonode") self.assertEqual(rule["layer"], "CA") self.assertEqual(rule["access"], "LIMIT") self.assertTrue("limits" in rule) rule_limits = rule["limits"] self.assertEqual( rule_limits["allowedArea"], "SRID=4326;MULTIPOLYGON (((145.8046418749977 -42.49606500060302, \ 146.7000276171853 -42.53655428642583, 146.7110139453067 -43.07256577359489, \ 145.9804231249952 -43.05651288026286, 145.8046418749977 -42.49606500060302)))", ) self.assertEqual(rule_limits["catalogMode"], "MIXED") _limit_rule_position = cnt else: # When there's a limit rule, "*" must be the first one self.assertTrue(_limit_rule_position < cnt) # Change Dataset Type and SRID in order to force GeoFence allowed-area reprojection _original_subtype = layer.subtype _original_srid = layer.srid layer.subtype = "raster" layer.srid = "EPSG:3857" layer.save() layer.set_permissions(perm_spec) rules_count = geofence.get_rules_count() rules_objs = geofence.get_rules() # Order is important _limit_rule_position = -1 for cnt, rule in enumerate(rules_objs["rules"]): if rule["roleName"] == "ROLE_BAR": if rule["service"] is None: self.assertEqual(rule["service"], None) self.assertEqual(rule["userName"], None) self.assertEqual(rule["workspace"], "geonode") self.assertEqual(rule["layer"], "CA") self.assertEqual(rule["access"], "LIMIT") self.assertTrue("limits" in rule) rule_limits = rule["limits"] self.assertEqual( rule_limits["allowedArea"], "SRID=4326;MULTIPOLYGON (((145.8046418749977 -42.49606500060302, 146.7000276171853 \ -42.53655428642583, 146.7110139453067 -43.07256577359489, 145.9804231249952 \ -43.05651288026286, 145.8046418749977 -42.49606500060302)))", ) self.assertEqual(rule_limits["catalogMode"], "MIXED") _limit_rule_position = cnt else: # When there's a limit rule, "*" must be the first one self.assertTrue(_limit_rule_position < cnt) layer.subtype = _original_subtype layer.srid = _original_srid layer.save() # Reset GeoFence Rules delete_all_acl_rules() rules_count = geofence.get_rules_count() self.assertEqual(rules_count, 0)
@on_ogc_backend(geoserver.BACKEND_PACKAGE)
[docs] def test_dataset_permissions(self): # Test permissions on a layer bobby = get_user_model().objects.get(username="bobby") layer = create_single_dataset("san_andres_y_providencia_poi") layer = resource_manager.update( layer.uuid, instance=layer, notify=False, vals=dict(owner=bobby, workspace=settings.DEFAULT_WORKSPACE) ) self.assertIsNotNone(layer) self.assertIsNotNone(layer.ows_url) self.assertIsNotNone(layer.ptype) self.assertIsNotNone(layer.sourcetype) self.assertEqual(layer.alternate, "geonode:san_andres_y_providencia_poi") # Reset GeoFence Rules delete_all_acl_rules() rules_count = geofence.get_rules_count() self.assertEqual(rules_count, 0) layer = Dataset.objects.get(name="san_andres_y_providencia_poi") # removing duplicates while Dataset.objects.filter(alternate=layer.alternate).count() > 1: Dataset.objects.filter(alternate=layer.alternate).last().delete() layer = Dataset.objects.get(alternate=layer.alternate) layer.set_default_permissions(owner=bobby) check_dataset(layer) rules_count = geofence.get_rules_count() _log(f"0. rules_count: {rules_count} ") self.assertGreaterEqual(rules_count, 4) # Set the layer private for not authenticated users perm_spec = {"users": {"AnonymousUser": []}, "groups": []} layer.set_permissions(perm_spec) url = ( f"{settings.GEOSERVER_LOCATION}ows?" "LAYERS=geonode%3Asan_andres_y_providencia_poi&STYLES=" "&FORMAT=image%2Fpng&SERVICE=WMS&VERSION=1.1.1&REQUEST=GetMap" "&SRS=EPSG%3A4326" "&BBOX=-81.394599749999,13.316009005566," "-81.370560451855,13.372728455566" "&WIDTH=217&HEIGHT=512" ) # test view_resourcebase permission on anonymous user response = requests.get(url) self.assertTrue(response.status_code, 404) self.assertEqual(response.headers.get("Content-Type"), "application/vnd.ogc.se_xml;charset=UTF-8") # test WMS with authenticated user that has access to the Dataset response = requests.get( url, auth=HTTPBasicAuth( username=settings.OGC_SERVER["default"]["USER"], password=settings.OGC_SERVER["default"]["PASSWORD"] ), ) self.assertTrue(response.status_code, 200) self.assertEqual(response.headers.get("Content-Type"), "image/png") # test WMS with authenticated user that has no view_resourcebase: # the layer should be not accessible response = requests.get(url, auth=HTTPBasicAuth(username="norman", password="norman")) self.assertTrue(response.status_code, 404) self.assertEqual(response.headers.get("Content-Type").strip().replace(" ", ""), "text/html;charset=utf-8") # test change_dataset_style url = f"{settings.GEOSERVER_LOCATION}rest/workspaces/geonode/styles/san_andres_y_providencia_poi.xml" sld = """<?xml version="1.0" encoding="UTF-8"?> <sld:StyledLayerDescriptor xmlns:sld="http://www.opengis.net/sld" xmlns:gml="http://www.opengis.net/gml" xmlns:ogc="http://www.opengis.net/ogc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" version="1.0.0" xsi:schemaLocation="http://www.opengis.net/sld http://schemas.opengis.net/sld/1.0.0/StyledLayerDescriptor.xsd"> <sld:NamedLayer> <sld:Name>geonode:san_andres_y_providencia_poi</sld:Name> <sld:UserStyle> <sld:Name>san_andres_y_providencia_poi</sld:Name> <sld:Title>san_andres_y_providencia_poi</sld:Title> <sld:IsDefault>1</sld:IsDefault> <sld:FeatureTypeStyle> <sld:Rule> <sld:PointSymbolizer> <sld:Graphic> <sld:Mark> <sld:Fill> <sld:CssParameter name="fill">#8A7700 </sld:CssParameter> </sld:Fill> <sld:Stroke> <sld:CssParameter name="stroke">#bbffff </sld:CssParameter> </sld:Stroke> </sld:Mark> <sld:Size>10</sld:Size> </sld:Graphic> </sld:PointSymbolizer> </sld:Rule> </sld:FeatureTypeStyle> </sld:UserStyle> </sld:NamedLayer> </sld:StyledLayerDescriptor>""" # user without change_dataset_style cannot edit it self.assertTrue(self.client.login(username="bobby", password="bob")) response = self.client.put(url, sld, content_type="application/vnd.ogc.sld+xml") self.assertEqual(response.status_code, 404) # user with change_dataset_style can edit it perm_spec = {"users": {"bobby": ["view_resourcebase", "change_resourcebase"]}, "groups": []} layer.set_permissions(perm_spec) response = self.client.put(url, sld, content_type="application/vnd.ogc.sld+xml") # _content_type = response.getheader('Content-Type') # self.assertEqual(_content_type, 'image/png') # Reset GeoFence Rules delete_all_acl_rules() rules_count = geofence.get_rules_count() self.assertTrue(rules_count == 0)
[docs] def test_maplayers_default_permissions(self): """Verify that Dataset.set_default_permissions is behaving as expected""" # Get a Dataset object to work with layer = Dataset.objects.first() # removing duplicates while Dataset.objects.filter(alternate=layer.alternate).count() > 1: Dataset.objects.filter(alternate=layer.alternate).last().delete() layer = Dataset.objects.get(alternate=layer.alternate) # Set the default permissions layer.set_default_permissions() # Test that the anonymous user can read self.assertTrue(self.anonymous_user.has_perm("view_resourcebase", layer.get_self_resource())) # Test that the owner user can read self.assertTrue(layer.owner.has_perm("view_resourcebase", layer.get_self_resource())) # Test that the owner user can download it self.assertTrue(layer.owner.has_perm("download_resourcebase", layer.get_self_resource())) # Test that the owner user can edit metadata self.assertTrue(layer.owner.has_perm("change_resourcebase_metadata", layer.get_self_resource())) # Test that the owner user can edit data if is vector type if layer.subtype == "vector": self.assertTrue(layer.owner.has_perm("change_dataset_data", layer)) # Test that the owner user can edit styles self.assertTrue(layer.owner.has_perm("change_dataset_style", layer)) # Test that the owner can manage the layer self.assertTrue(layer.owner.has_perm("change_resourcebase", layer.get_self_resource())) self.assertTrue(layer.owner.has_perm("delete_resourcebase", layer.get_self_resource())) self.assertTrue(layer.owner.has_perm("change_resourcebase_permissions", layer.get_self_resource())) self.assertTrue(layer.owner.has_perm("publish_resourcebase", layer.get_self_resource()))
[docs] def test_set_dataset_permissions(self): """Verify that the set_dataset_permissions view is behaving as expected""" # Get a layer to work with layer = Dataset.objects.first() # removing duplicates while Dataset.objects.filter(alternate=layer.alternate).count() > 1: Dataset.objects.filter(alternate=layer.alternate).last().delete() layer = Dataset.objects.get(alternate=layer.alternate) # FIXME Test a comprehensive set of permissions specifications # Set the Default Permissions layer.set_default_permissions() # Test that the Permissions for anonymous user are set self.assertTrue(self.anonymous_user.has_perm("view_resourcebase", layer.get_self_resource())) # Set the Permissions layer.set_permissions(self.perm_spec) # Test that the Permissions for anonymous user are un-set self.assertFalse(self.anonymous_user.has_perm("view_resourcebase", layer.get_self_resource())) # Test that previous permissions for users other than ones specified in # the perm_spec (and the layers owner) were removed current_perms = layer.get_all_level_info() self.assertGreaterEqual(len(current_perms["users"]), 1) # Test that there are no duplicates on returned permissions for _k, _v in current_perms.items(): for _kk, _vv in current_perms[_k].items(): if _vv and isinstance(_vv, list): _vv_1 = _vv.copy() _vv_2 = list(set(_vv.copy())) _vv_1.sort() _vv_2.sort() self.assertListEqual(_vv_1, _vv_2) # Test that the User permissions specified in the perm_spec were # applied properly for username, perm in self.perm_spec["users"].items(): user = get_user_model().objects.get(username=username) self.assertTrue(user.has_perm(perm, layer.get_self_resource()))
[docs] def test_ajax_dataset_permissions(self): """Verify that the ajax_dataset_permissions view is behaving as expected""" # Setup some layer names to work with valid_dataset_typename = Dataset.objects.all().first().id invalid_dataset_id = 9999999 # Test that an invalid layer.alternate is handled for properly response = self.client.post( reverse("resource_permissions", args=(invalid_dataset_id,)), data=json.dumps(self.perm_spec), content_type="application/json", ) self.assertEqual(response.status_code, 404) # Test that GET returns permissions response = self.client.get(reverse("resource_permissions", args=(valid_dataset_typename,))) assert "permissions" in ensure_string(response.content) # Test that a user is required to have maps.change_dataset_permissions # First test un-authenticated response = self.client.post( reverse("resource_permissions", args=(valid_dataset_typename,)), data=json.dumps(self.perm_spec), content_type="application/json", ) self.assertEqual(response.status_code, 401) # Next Test with a user that does NOT have the proper perms self.assertTrue(self.client.login(username="norman", password="norman")) response = self.client.post( reverse("resource_permissions", args=(valid_dataset_typename,)), data=json.dumps(self.perm_spec), content_type="application/json", ) self.assertEqual(response.status_code, 401) # Login as a user with the proper permission and test the endpoint self.assertTrue(self.client.login(username="admin", password="admin")) response = self.client.post( reverse("resource_permissions", args=(valid_dataset_typename,)), data=json.dumps(self.perm_spec), content_type="application/json", ) # Test that the method returns 200 self.assertEqual(response.status_code, 200)
# Test that the permissions specification is applied # Should we do this here, or assume the tests in # test_set_dataset_permissions will handle for that?
[docs] def test_perms_info(self): """Verify that the perms_info view is behaving as expected""" # Test with a Dataset object layer = Dataset.objects.first() # removing duplicates while Dataset.objects.filter(alternate=layer.alternate).count() > 1: Dataset.objects.filter(alternate=layer.alternate).last().delete() layer = Dataset.objects.get(alternate=layer.alternate) layer.set_default_permissions() # Test that the anonymous user can read self.assertTrue(self.anonymous_user.has_perm("view_resourcebase", layer.get_self_resource())) # Test that layer owner can edit layer self.assertTrue(layer.owner.has_perm("change_resourcebase", layer.get_self_resource())) # Test with a Map object a_map = Map.objects.first() a_map.set_default_permissions() perms = get_users_with_perms(a_map) self.assertIsNotNone(perms) self.assertGreaterEqual(len(perms), 1)
# now we test permissions, first on an authenticated user and then on the # anonymous user # 1. view_resourcebase # 2. change_resourcebase # 3. delete_resourcebase # 4. change_resourcebase_metadata # 5. change_resourcebase_permissions # 6. change_dataset_data # 7. change_dataset_style
[docs] def test_not_superuser_permissions(self): rules_count = 0 if check_ogc_backend(geoserver.BACKEND_PACKAGE): delete_all_acl_rules() # Reset GeoFence Rules rules_count = geofence.get_rules_count() self.assertTrue(rules_count == 0) # grab bobby bob = get_user_model().objects.get(username="bobby") # grab a layer layer = Dataset.objects.exclude(owner=bob).first() # removing duplicates while Dataset.objects.filter(alternate=layer.alternate).count() > 1: Dataset.objects.filter(alternate=layer.alternate).last().delete() layer = Dataset.objects.get(alternate=layer.alternate) layer.set_default_permissions() # verify bobby has view permissions on it self.assertTrue(bob.has_perm("view_resourcebase", layer.get_self_resource())) if check_ogc_backend(geoserver.BACKEND_PACKAGE): # Check GeoFence Rules have been correctly created rules_count = geofence.get_rules_count() _log(f"1. rules_count: {rules_count} ") self.assertTrue(self.client.login(username="bobby", password="bob")) # 1. view_resourcebase # 1.1 has view_resourcebase: verify that bobby can access the layer # detail page self.assertTrue(bob.has_perm("view_resourcebase", layer.get_self_resource())) response = self.client.get(reverse("dataset_embed", args=(layer.alternate,))) self.assertEqual(response.status_code, 200, response.status_code) # 1.2 has not view_resourcebase: verify that bobby can not access the # layer detail page layer.set_permissions({"users": {"AnonymousUser": []}, "groups": []}) Group.objects.get(name="anonymous") response = self.client.get(reverse("dataset_embed", args=(layer.alternate,))) self.assertTrue(response.status_code in (401, 403), response.status_code) # 2. change_resourcebase # 2.1 has not change_resourcebase: verify that bobby cannot access the # layer replace page response = self.client.get(reverse("dataset_replace", args=(layer.alternate,))) self.assertTrue(response.status_code in (401, 403), response.status_code) # 2.2 has change_resourcebase: verify that bobby can access the layer # replace page layer.set_permissions({"users": {"bobby": ["change_resourcebase"]}, "groups": []}) self.assertTrue(bob.has_perm("change_resourcebase", layer.get_self_resource())) response = self.client.get(reverse("dataset_replace", args=(layer.alternate,))) self.assertEqual(response.status_code, 200, response.status_code) # 3. change_resourcebase_metadata # 3.1 has not change_resourcebase_metadata: verify that bobby cannot # access the layer metadata page response = self.client.get(reverse("dataset_metadata", args=(layer.alternate,))) self.assertTrue(response.status_code in (401, 403), response.status_code) # 3.2 has delete_resourcebase: verify that bobby can access the layer # delete page layer.set_permissions( { "users": {"bobby": ["change_resourcebase", "change_resourcebase_metadata", "delete_resourcebase"]}, "groups": [], } ) self.assertTrue(bob.has_perm("change_resourcebase_metadata", layer.get_self_resource())) response = self.client.get(reverse("dataset_metadata", args=(layer.alternate,))) self.assertEqual(response.status_code, 200, response.status_code) if check_ogc_backend(geoserver.BACKEND_PACKAGE): perms = get_users_with_perms(layer) _log(f"2. perms: {perms} ") batch = create_geofence_rules(layer, perms, user=bob) geofence.run_batch(batch) # Check GeoFence Rules have been correctly created rules_count = geofence.get_rules_count() _log(f"3. rules_count: {rules_count} ") # 4. change_resourcebase_permissions # should be impossible for the user without change_resourcebase_permissions # to change permissions as the permission form is not available in the # layer detail page? # 5. change_dataset_data # must be done in integration test sending a WFS-T request with CURL # 6. change_dataset_style # 6.1 has not change_dataset_style: verify that bobby cannot access # the layer style page if check_ogc_backend(geoserver.BACKEND_PACKAGE): # Only for geoserver backend response = self.client.get(reverse("dataset_style_manage", args=(layer.alternate,))) self.assertTrue(response.status_code in (401, 403), response.status_code) # 7.2 has change_dataset_style: verify that bobby can access the # change layer style page if check_ogc_backend(geoserver.BACKEND_PACKAGE): # Only for geoserver backend layer.set_permissions( { "users": { "bobby": [ "change_resourcebase", "change_resourcebase_metadata", "delete_resourcebase", "change_dataset_style", ] }, "groups": [], } ) self.assertTrue(bob.has_perm("change_dataset_style", layer)) response = self.client.get(reverse("dataset_style_manage", args=(layer.alternate,))) self.assertEqual(response.status_code, 200, response.status_code) rules_count = 0 if check_ogc_backend(geoserver.BACKEND_PACKAGE): delete_all_acl_rules() # Reset GeoFence Rules rules_count = geofence.get_rules_count() self.assertEqual(rules_count, 0, rules_count)
[docs] def test_anonymus_permissions(self): # grab a layer layer = Dataset.objects.first() # removing duplicates while Dataset.objects.filter(alternate=layer.alternate).count() > 1: Dataset.objects.filter(alternate=layer.alternate).last().delete() layer = Dataset.objects.get(alternate=layer.alternate) layer.set_default_permissions() # 1. view_resourcebase # 1.1 has view_resourcebase: verify that anonymous user can access # the layer detail page self.assertTrue(self.anonymous_user.has_perm("view_resourcebase", layer.get_self_resource())) response = self.client.get(reverse("dataset_embed", args=(layer.alternate,))) self.assertEqual(response.status_code, 200) # 1.2 has not view_resourcebase: verify that anonymous user can not # access the layer detail page layer.set_permissions({"users": {"AnonymousUser": []}, "groups": []}) response = self.client.get(reverse("dataset_embed", args=(layer.alternate,))) self.assertTrue(response.status_code in (302, 403)) # 2. change_resourcebase # 2.1 has not change_resourcebase: verify that anonymous user cannot # access the layer replace page but redirected to login response = self.client.get(reverse("dataset_replace", args=(layer.alternate,))) self.assertTrue(response.status_code in (302, 403)) # 3. change_resourcebase_metadata # 3.1 has not change_resourcebase_metadata: verify that anonymous user # cannot access the layer metadata page but redirected to login response = self.client.get(reverse("dataset_metadata", args=(layer.alternate,))) self.assertTrue(response.status_code in (302, 403)) # 4. change_dataset_style # 4.1 has not change_dataset_style: verify that anonymous user cannot access # the layer style page but redirected to login if check_ogc_backend(geoserver.BACKEND_PACKAGE): # Only for geoserver backend response = self.client.get(reverse("dataset_style_manage", args=(layer.alternate,))) self.assertTrue(response.status_code in (302, 403))
[docs] def test_get_visible_resources_should_return_resource_with_metadata_only_false(self): layers = Dataset.objects.all() actual = get_visible_resources(queryset=layers, user=get_user_model().objects.get(username=self.user)) self.assertEqual(8, len(actual))
[docs] def test_get_visible_resources_should_return_updated_resource_with_metadata_only_false(self): # Updating the layer with metadata only True to verify that the filter works Dataset.objects.filter(title="dataset metadata true").update(metadata_only=False) layers = Dataset.objects.all() actual = get_visible_resources(queryset=layers, user=get_user_model().objects.get(username=self.user)) self.assertEqual(layers.filter(dirty_state=False).count(), len(actual))
[docs] def test_get_visible_resources_should_return_resource_with_metadata_only_true(self): """ If metadata only is provided, it should return only the metadata resources """ try: dataset = create_single_dataset("dataset_with_metadata_only_True") dataset.metadata_only = True dataset.save() layers = Dataset.objects.all() actual = get_visible_resources( queryset=layers, metadata_only=True, user=get_user_model().objects.get(username=self.user) ) self.assertEqual(1, actual.count()) finally: if dataset: dataset.delete()
[docs] def test_get_visible_resources_should_return_resource_with_metadata_only_none(self): """ If metadata only is provided, it should return only the metadata resources """ try: dataset = create_single_dataset("dataset_with_metadata_only_True") dataset.metadata_only = True dataset.save() layers = Dataset.objects.all() actual = get_visible_resources( queryset=layers, metadata_only=None, user=get_user_model().objects.get(username=self.user) ) self.assertEqual(layers.count(), actual.count()) finally: if dataset: dataset.delete()
@override_settings(ADMIN_MODERATE_UPLOADS=True, RESOURCE_PUBLISHING=True, GROUP_PRIVATE_RESOURCES=True)
[docs] def test_get_visible_resources_advanced_workflow(self): admin_user = get_user_model().objects.get(username="admin") standard_user = get_user_model().objects.get(username="bobby") self.assertIsNotNone(admin_user) self.assertIsNotNone(standard_user) admin_user.is_superuser = True admin_user.save() layers = Dataset.objects.all() actual = get_visible_resources( queryset=Dataset.objects.all(), user=admin_user, admin_approval_required=True, unpublished_not_visible=True, private_groups_not_visibile=True, ) # The method returns only 'metadata_only=False' resources self.assertEqual(layers.count(), actual.count()) actual = get_visible_resources( queryset=Dataset.objects.all(), user=standard_user, admin_approval_required=True, unpublished_not_visible=True, private_groups_not_visibile=True, ) # The method returns only 'metadata_only=False' resources self.assertEqual(layers.count(), actual.count()) # Test 'is_approved=False' 'is_published=False' Dataset.objects.filter(~Q(owner=standard_user)).update(is_approved=False, is_published=False) actual = get_visible_resources( queryset=Dataset.objects.all(), user=admin_user, admin_approval_required=True, unpublished_not_visible=True, private_groups_not_visibile=True, ) # The method returns only 'metadata_only=False' resources self.assertEqual(layers.count(), actual.count()) actual = get_visible_resources( queryset=Dataset.objects.all(), user=standard_user, admin_approval_required=True, unpublished_not_visible=True, private_groups_not_visibile=True, ) # The method returns only 'metadata_only=False' resources self.assertEqual(layers.count(), actual.count()) actual = get_visible_resources( queryset=Dataset.objects.all(), user=None, admin_approval_required=True, unpublished_not_visible=True, private_groups_not_visibile=True, ) # The method returns only 'metadata_only=False' resources self.assertEqual(1, actual.count()) # Test private groups private_groups = GroupProfile.objects.filter(access="private") if private_groups.first(): private_groups.first().leave(standard_user) Dataset.objects.filter(~Q(owner=standard_user)).update(group=private_groups.first().group) actual = get_visible_resources( queryset=Dataset.objects.all(), user=admin_user, admin_approval_required=True, unpublished_not_visible=True, private_groups_not_visibile=True, ) # The method returns only 'metadata_only=False' resources self.assertEqual(layers.count(), actual.count()) actual = get_visible_resources( queryset=Dataset.objects.all(), user=standard_user, admin_approval_required=True, unpublished_not_visible=True, private_groups_not_visibile=True, ) # The method returns only 'metadata_only=False' resources self.assertEqual(8, actual.count()) actual = get_visible_resources( queryset=Dataset.objects.all(), user=None, admin_approval_required=True, unpublished_not_visible=True, private_groups_not_visibile=True, ) # The method returns only 'metadata_only=False' resources self.assertEqual(1, actual.count())
[docs] def test_get_visible_resources(self): standard_user = get_user_model().objects.get(username="bobby") layers = Dataset.objects.all() # update user's perm on a layer, # this should not return the layer since it will not be in user's allowed resources _title = "common bar" for x in Dataset.objects.filter(title=_title): x.set_permissions({"users": {"bobby": []}, "groups": []}) actual = get_visible_resources( queryset=layers, user=standard_user, admin_approval_required=True, unpublished_not_visible=True, private_groups_not_visibile=True, ) self.assertNotIn(_title, list(actual.values_list("title", flat=True))) # get layers as admin, this should return all layers with metadata_only = True actual = get_visible_resources(queryset=layers, user=get_user_model().objects.get(username=self.user)) self.assertIn(_title, list(actual.values_list("title", flat=True)))
[docs] def test_perm_spec_conversion(self): """ Perm Spec from extended to cmpact and viceversa """ standard_user = get_user_model().objects.get(username="bobby") dataset = Dataset.objects.filter(owner=standard_user).first() perm_spec = { "users": {"bobby": ["view_resourcebase", "download_resourcebase", "change_dataset_style"]}, "groups": {}, } _p = PermSpec(perm_spec, dataset) self.assertDictEqual( json.loads(str(_p)), {"users": {"bobby": ["view_resourcebase", "download_resourcebase", "change_dataset_style"]}, "groups": {}}, ) self.assertDictEqual( _p.compact, { "users": [ { "id": standard_user.id, "username": standard_user.username, "first_name": standard_user.first_name, "last_name": standard_user.last_name, "avatar": "https://www.gravatar.com/avatar/d41d8cd98f00b204e9800998ecf8427e/?s=240", "permissions": "owner", "is_staff": False, "is_superuser": False, }, { "avatar": "https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240", "first_name": "admin", "id": 1, "last_name": "", "permissions": "manage", "username": "admin", "is_staff": True, "is_superuser": True, }, ], "organizations": [], "groups": [ {"id": 3, "title": "anonymous", "name": "anonymous", "permissions": "none"}, {"id": 2, "name": "registered-members", "permissions": "none", "title": "Registered Members"}, ], }, ) perm_spec = { "users": { "AnonymousUser": ["view_resourcebase"], "bobby": ["view_resourcebase", "download_resourcebase", "change_dataset_style"], }, "groups": {}, } _p = PermSpec(perm_spec, dataset) self.assertDictEqual( json.loads(str(_p)), { "users": { "AnonymousUser": ["view_resourcebase"], "bobby": ["view_resourcebase", "download_resourcebase", "change_dataset_style"], }, "groups": {}, }, ) self.assertDictEqual( _p.compact, { "users": [ { "id": standard_user.id, "username": standard_user.username, "first_name": standard_user.first_name, "last_name": standard_user.last_name, "avatar": "https://www.gravatar.com/avatar/d41d8cd98f00b204e9800998ecf8427e/?s=240", "permissions": "owner", "is_staff": False, "is_superuser": False, }, { "avatar": "https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240", "first_name": "admin", "id": 1, "last_name": "", "permissions": "manage", "username": "admin", "is_staff": True, "is_superuser": True, }, ], "organizations": [], "groups": [ {"id": 3, "title": "anonymous", "name": "anonymous", "permissions": "view"}, {"id": 2, "name": "registered-members", "permissions": "none", "title": "Registered Members"}, ], }, ) self.assertTrue(PermSpecCompact.validate(_p.compact)) _pp = PermSpecCompact(_p.compact, dataset) _pp_e = { "users": { "bobby": [ "change_dataset_style", "publish_resourcebase", "delete_resourcebase", "change_resourcebase_metadata", "download_resourcebase", "change_resourcebase", "change_resourcebase_permissions", "view_resourcebase", "change_dataset_data", ], "admin": [ "change_dataset_style", "publish_resourcebase", "delete_resourcebase", "change_resourcebase_metadata", "download_resourcebase", "change_resourcebase", "change_resourcebase_permissions", "view_resourcebase", "change_dataset_data", ], "AnonymousUser": ["view_resourcebase"], }, "groups": {"anonymous": ["view_resourcebase"], "registered-members": []}, } self.assertListEqual(list(_pp.extended.keys()), list(_pp_e.keys())) for _key in _pp.extended.keys(): self.assertListEqual(list(_pp.extended.get(_key).keys()), list(_pp_e.get(_key).keys())) for __key in _pp.extended.get(_key).keys(): self.assertListEqual( sorted(list(set(_pp.extended.get(_key).get(__key)))), sorted(list(set(_pp_e.get(_key).get(__key)))) ) _pp2 = PermSpecCompact( { "users": [ { "id": standard_user.id, "username": standard_user.username, "first_name": standard_user.first_name, "last_name": standard_user.last_name, "avatar": "https://www.gravatar.com/avatar/d41d8cd98f00b204e9800998ecf8427e/?s=240", "permissions": "view", } ] }, dataset, ) _pp.merge(_pp2) _pp_e = { "users": { "bobby": [ "change_resourcebase_permissions", "change_resourcebase_metadata", "change_dataset_data", "change_resourcebase", "delete_resourcebase", "publish_resourcebase", "download_resourcebase", "change_dataset_style", "view_resourcebase", ], "admin": [ "change_resourcebase_permissions", "change_resourcebase_metadata", "change_dataset_data", "change_resourcebase", "delete_resourcebase", "publish_resourcebase", "download_resourcebase", "change_dataset_style", "view_resourcebase", ], "AnonymousUser": ["view_resourcebase"], }, "groups": {"anonymous": ["view_resourcebase"], "registered-members": []}, } self.assertListEqual(list(_pp.extended.keys()), list(_pp_e.keys())) for _key in _pp.extended.keys(): self.assertListEqual(list(_pp.extended.get(_key).keys()), list(_pp_e.get(_key).keys())) for __key in _pp.extended.get(_key).keys(): self.assertListEqual( sorted(list(set(_pp.extended.get(_key).get(__key)))), sorted(list(set(_pp_e.get(_key).get(__key)))) ) # Test "download" permissions retention policy # 1. "download" permissions are allowed on "Documents" test_document = Document.objects.first() perm_spec = { "users": { "bobby": [ "view_resourcebase", "download_resourcebase", ] }, "groups": {}, } _p = PermSpec(perm_spec, test_document) self.assertDictEqual( json.loads(str(_p)), { "users": { "bobby": [ "view_resourcebase", "download_resourcebase", ] }, "groups": {}, }, json.loads(str(_p)), ) self.assertDictEqual( _p.compact, { "users": [ { "id": standard_user.id, "username": standard_user.username, "first_name": standard_user.first_name, "last_name": standard_user.last_name, "avatar": "https://www.gravatar.com/avatar/d41d8cd98f00b204e9800998ecf8427e/?s=240", "permissions": "download", "is_staff": False, "is_superuser": False, }, { "avatar": "https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240", "first_name": "admin", "id": 1, "last_name": "", "permissions": "owner", "username": "admin", "is_staff": True, "is_superuser": True, }, ], "organizations": [], "groups": [ {"id": 3, "title": "anonymous", "name": "anonymous", "permissions": "none"}, {"id": 2, "name": "registered-members", "permissions": "none", "title": "Registered Members"}, ], }, _p.compact, ) # 2. "download" permissions are NOT allowed on "Maps" test_map = Map.objects.first() perm_spec = { "users": { "bobby": [ "view_resourcebase", "download_resourcebase", ] }, "groups": {}, } _p = PermSpec(perm_spec, test_map) self.assertDictEqual( json.loads(str(_p)), { "users": { "bobby": [ "view_resourcebase", "download_resourcebase", ] }, "groups": {}, }, json.loads(str(_p)), ) self.assertDictEqual( _p.compact, { "users": [ { "id": standard_user.id, "username": standard_user.username, "first_name": standard_user.first_name, "last_name": standard_user.last_name, "avatar": "https://www.gravatar.com/avatar/d41d8cd98f00b204e9800998ecf8427e/?s=240", "permissions": "view", "is_staff": False, "is_superuser": False, }, { "avatar": "https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240", "first_name": "admin", "id": 1, "last_name": "", "permissions": "owner", "username": "admin", "is_staff": True, "is_superuser": True, }, ], "organizations": [], "groups": [ {"id": 3, "title": "anonymous", "name": "anonymous", "permissions": "none"}, {"id": 2, "name": "registered-members", "permissions": "none", "title": "Registered Members"}, ], }, _p.compact, )
[docs] def test_admin_whitelisted_access_backend(self): from geonode.security.backends import AdminRestrictedAccessBackend from django.core.exceptions import PermissionDenied backend = AdminRestrictedAccessBackend() with self.settings(ADMIN_IP_WHITELIST=["88.88.88.88"]): with self.assertRaises(PermissionDenied): backend.authenticate(HttpRequest(), username="admin", password="admin") with self.settings(ADMIN_IP_WHITELIST=[]): request = HttpRequest() request.META["REMOTE_ADDR"] = "127.0.0.1" user = backend.authenticate(request, username="admin", password="admin") self.assertIsNone(user)
[docs] def test_admin_whitelisted_access_middleware(self): from geonode.security.middleware import AdminAllowedMiddleware get_response = mock.MagicMock() middleware = AdminAllowedMiddleware(get_response) admin = get_user_model().objects.filter(is_superuser=True).first() # Test invalid IP with self.settings(ADMIN_IP_WHITELIST=["88.88.88.88"]): request = HttpRequest() request.user = admin request.path = reverse("home") request.META["REMOTE_ADDR"] = "127.0.0.1" middleware.process_request(request) self.assertEqual(request.user, AnonymousUser()) request = HttpRequest() basic_auth = base64.b64encode(b"admin:admin").decode() request.META["HTTP_AUTHORIZATION"] = f"Basic {basic_auth}" request.path = reverse("home") request.META["REMOTE_ADDR"] = "127.0.0.1" middleware.process_request(request) self.assertIsNone(request.META.get("HTTP_AUTHORIZATION")) token = create_auth_token(admin) request.META["HTTP_AUTHORIZATION"] = f"Bearer {token}" middleware.process_request(request) self.assertIsNone(request.META.get("HTTP_AUTHORIZATION")) with self.settings(ADMIN_IP_WHITELIST=[]): request = HttpRequest() request.user = admin request.path = reverse("home") request.META["REMOTE_ADDR"] = "127.0.0.1" middleware.process_request(request) self.assertTrue(request.user.is_superuser) # Test valid IP with self.settings(ADMIN_IP_WHITELIST=["127.0.0.1"]): request = HttpRequest() request.user = admin request.path = reverse("home") request.META["REMOTE_ADDR"] = "127.0.0.1" middleware.process_request(request) self.assertTrue(request.user.is_superuser) # Test range of whitelisted IPs with self.settings(ADMIN_IP_WHITELIST=["127.0.0.0/24"]): request = HttpRequest() request.user = admin request.path = reverse("home") request.META["REMOTE_ADDR"] = "127.0.0.1" middleware.process_request(request) self.assertTrue(request.user.is_superuser) # Test valid IP in second element with self.settings(ADMIN_IP_WHITELIST=["88.88.88.88", "127.0.0.1"]): request = HttpRequest() request.user = admin request.path = reverse("home") request.META["REMOTE_ADDR"] = "127.0.0.1" middleware.process_request(request) self.assertTrue(request.user.is_superuser)
[docs] class SecurityRulesTests(TestCase): """ Test resources synchronization with Guardian and dirty states cleaning """
[docs] def setUp(self): self.maxDiff = None self._l = create_single_dataset("test_dataset")
[docs] def test_sync_resources_with_guardian_delay_false(self): with self.settings(DELAYED_SECURITY_SIGNALS=False, ACL_SECURITY_ENABLED=True): delete_acl_rules_for_layer(self._l) # Set geofence (and so the dirty state) allow_layer_to_all(self._l) # Retrieve the same layer dirty_dataset = Dataset.objects.get(pk=self._l.id) # Check dirty state (True) self.assertFalse(dirty_dataset.dirty_state) # Call sync resources sync_resources_with_guardian() clean_dataset = Dataset.objects.get(pk=self._l.id) # Check dirty state self.assertFalse(clean_dataset.dirty_state)
# TODO: DELAYED SECURITY MUST BE REVISED
[docs] def test_sync_resources_with_guardian_delay_true(self): with self.settings(DELAYED_SECURITY_SIGNALS=True, ACL_SECURITY_ENABLED=True): delete_acl_rules_for_layer(self._l) # Set geofence (and so the dirty state) allow_layer_to_all(self._l) # Retrieve the same layer dirty_dataset = Dataset.objects.get(pk=self._l.id) # Check dirty state (True) self.assertTrue(dirty_dataset.dirty_state) # Call sync resources sync_resources_with_guardian()
# clean_dataset = Dataset.objects.get(pk=self._l.id) # Check dirty state # TODO: DELAYED SECURITY MUST BE REVISED # self.assertFalse(clean_dataset.dirty_state)
[docs] class TestGetUserGeolimits(TestCase):
[docs] def setUp(self): self.maxDiff = None self.layer = create_single_dataset("main-layer") self.owner = get_user_model().objects.get(username="admin") self.perms = {"*": ""} self.gf_services = _get_gf_services(self.layer, self.perms)
[docs] def test_should_not_disable_cache_for_user_without_geolimits(self): _disable_dataset_cache = has_geolimits(self.layer, self.owner, None) self.assertFalse(_disable_dataset_cache)
[docs] def test_should_disable_cache_for_user_with_geolimits(self): geo_limit, _ = UserGeoLimit.objects.get_or_create(user=self.owner, resource=self.layer) self.layer.users_geolimits.set([geo_limit]) self.layer.refresh_from_db() _disable_dataset_cache = has_geolimits(self.layer, self.owner, None) self.assertTrue(_disable_dataset_cache)
[docs] def test_should_not_disable_cache_for_anonymous_without_geolimits(self): _disable_dataset_cache = has_geolimits(self.layer, None, None) self.assertFalse(_disable_dataset_cache)
[docs] def test_should_disable_cache_for_anonymous_with_geolimits(self): geo_limit, _ = UserGeoLimit.objects.get_or_create(user=get_anonymous_user(), resource=self.layer) self.layer.users_geolimits.set([geo_limit]) self.layer.refresh_from_db() _disable_dataset_cache = has_geolimits(self.layer, None, None) self.assertTrue(_disable_dataset_cache)
[docs] class SetPermissionsTestCase(GeoNodeBaseTestSupport):
[docs] def setUp(self): # Creating groups and asign also to the anonymous_group self.author, created = get_user_model().objects.get_or_create(username="author") self.group_manager, created = get_user_model().objects.get_or_create(username="group_manager") self.group_member, created = get_user_model().objects.get_or_create(username="group_member") self.not_group_member, created = get_user_model().objects.get_or_create(username="not_group_member") # Defining group profiles and members self.group_profile, created = GroupProfile.objects.get_or_create(slug="custom_group") self.second_custom_group, created = GroupProfile.objects.get_or_create(slug="second_custom_group") # defining group members GroupMember.objects.get_or_create(group=self.group_profile, user=self.author, role="member") GroupMember.objects.get_or_create(group=self.group_profile, user=self.group_manager, role="manager") GroupMember.objects.get_or_create(group=self.group_profile, user=self.group_member, role="member") GroupMember.objects.get_or_create(group=self.second_custom_group, user=self.not_group_member, role="member") # Creating he default resource self.resource = create_single_dataset(name="test_layer", owner=self.author, group=self.group_profile.group) self.anonymous_user = get_anonymous_user()
@override_settings(RESOURCE_PUBLISHING=False) @override_settings(ADMIN_MODERATE_UPLOADS=False)
[docs] def test_set_compact_permissions(self): """ **AUTO PUBLISHING** - test_set_compact_permissions - `RESOURCE_PUBLISHING = False` - `ADMIN_MODERATE_UPLOADS = False` """ use_cases = [ ( PermSpec({"users": {}, "groups": {}}, self.resource).compact, { self.author: [ "change_resourcebase", "change_resourcebase_metadata", "change_resourcebase_permissions", "delete_resourcebase", "download_resourcebase", "publish_resourcebase", "view_resourcebase", ], self.group_manager: [], self.group_member: [], self.not_group_member: [], self.anonymous_user: [], }, ), ( PermSpec( { "users": {"AnonymousUser": ["view_resourcebase"]}, "groups": {"second_custom_group": ["change_resourcebase"]}, }, self.resource, ).compact, { self.author: [ "change_resourcebase", "change_resourcebase_metadata", "change_resourcebase_permissions", "delete_resourcebase", "download_resourcebase", "publish_resourcebase", "view_resourcebase", ], self.group_manager: ["view_resourcebase"], self.group_member: ["view_resourcebase"], self.not_group_member: [ "change_resourcebase", "view_resourcebase", "download_resourcebase", "change_resourcebase_metadata", ], self.anonymous_user: ["view_resourcebase"], }, ), ] for counter, item in enumerate(use_cases): permissions, expected = item self.resource.set_permissions(permissions) for authorized_subject, expected_perms in expected.items(): perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)] self.assertSetEqual( set(expected_perms), set(perms_got), msg=f"use case #{counter} - user: {authorized_subject.username}", )
@override_settings(RESOURCE_PUBLISHING=True)
[docs] def test_permissions_are_set_as_expected_resource_publishing_True(self): """ **SIMPLE PUBLISHING** - test_permissions_are_set_as_expected_resource_publishing_True - `RESOURCE_PUBLISHING = True` (Autopublishing is disabled) - `ADMIN_MODERATE_UPLOADS = False` """ use_cases = [ ( {"users": {}, "groups": {}}, { self.author: [ "delete_resourcebase", "download_resourcebase", "view_resourcebase", "change_resourcebase", "change_resourcebase_metadata", "change_resourcebase_permissions", ], self.group_manager: [ "change_resourcebase", "change_resourcebase_metadata", "delete_resourcebase", "download_resourcebase", "change_resourcebase_permissions", "view_resourcebase", "publish_resourcebase", ], self.group_member: ["download_resourcebase", "view_resourcebase"], self.not_group_member: [], self.anonymous_user: [], }, ), ( {"users": [], "groups": {"second_custom_group": ["view_resourcebase"]}}, { self.author: [ "delete_resourcebase", "download_resourcebase", "view_resourcebase", "change_resourcebase", "change_resourcebase_metadata", "change_resourcebase_permissions", ], self.group_manager: [ "change_resourcebase", "change_resourcebase_metadata", "delete_resourcebase", "download_resourcebase", "view_resourcebase", "change_resourcebase_permissions", "publish_resourcebase", ], self.group_member: ["download_resourcebase", "view_resourcebase"], self.not_group_member: ["view_resourcebase"], self.anonymous_user: [], }, ), ] for counter, item in enumerate(use_cases): permissions, expected = item self.resource.set_permissions(permissions) for authorized_subject, expected_perms in expected.items(): perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)] self.assertSetEqual( set(expected_perms), set(perms_got), msg=f"use case #{counter} - user: {authorized_subject.username}", )
@override_settings(RESOURCE_PUBLISHING=True) @override_settings(ADMIN_MODERATE_UPLOADS=True)
[docs] def test_permissions_are_set_as_expected_admin_upload_resource_publishing_True(self): """ **ADVANCED WORKFLOW** - test_permissions_are_set_as_expected_admin_upload_resource_publishing_True - `RESOURCE_PUBLISHING = True` - `ADMIN_MODERATE_UPLOADS = True` """ use_cases = [ ( {"users": {}, "groups": {}}, { self.author: [ "download_resourcebase", "view_resourcebase", ], self.group_manager: [ "change_resourcebase", "change_resourcebase_metadata", "change_resourcebase_permissions", "publish_resourcebase", "delete_resourcebase", "download_resourcebase", "view_resourcebase", ], self.group_member: ["download_resourcebase", "view_resourcebase"], self.not_group_member: [], self.anonymous_user: [], }, ), ( {"users": {}, "groups": {"second_custom_group": ["view_resourcebase"]}}, { self.author: [ "download_resourcebase", "view_resourcebase", ], self.group_manager: [ "change_resourcebase", "change_resourcebase_metadata", "change_resourcebase_permissions", "publish_resourcebase", "delete_resourcebase", "download_resourcebase", "view_resourcebase", ], self.group_member: ["download_resourcebase", "view_resourcebase"], self.not_group_member: ["view_resourcebase"], self.anonymous_user: [], }, ), ] try: self.resource.is_approved = True self.resource.is_published = False self.resource.save() for counter, item in enumerate(use_cases): permissions, expected = item self.resource.set_permissions(permissions) for authorized_subject, expected_perms in expected.items(): perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)] self.assertSetEqual( set(expected_perms), set(perms_got), msg=f"use case #{counter} - user: {authorized_subject.username}", ) finally: self.resource.is_approved = True self.resource.is_published = True self.resource.save()
@override_settings(RESOURCE_PUBLISHING=False) @override_settings(ADMIN_MODERATE_UPLOADS=False)
[docs] def test_permissions_are_set_as_expected_admin_upload_resource_publishing_False(self): """ **AUTO PUBLISHING** - test_permissions_are_set_as_expected_admin_upload_resource_publishing_False - `RESOURCE_PUBLISHING = False` - `ADMIN_MODERATE_UPLOADS = False` """ use_cases = [ ( {"users": {}, "groups": {}}, { self.author: [ "change_resourcebase", "change_resourcebase_metadata", "change_resourcebase_permissions", "delete_resourcebase", "download_resourcebase", "publish_resourcebase", "view_resourcebase", ], self.group_manager: [], self.group_member: [], self.not_group_member: [], self.anonymous_user: [], }, ), ( { "users": {"AnonymousUser": ["view_resourcebase"]}, "groups": {"second_custom_group": ["change_resourcebase"]}, }, { self.author: [ "change_resourcebase", "change_resourcebase_metadata", "change_resourcebase_permissions", "delete_resourcebase", "download_resourcebase", "publish_resourcebase", "view_resourcebase", ], self.group_manager: ["view_resourcebase"], self.group_member: ["view_resourcebase"], self.not_group_member: ["view_resourcebase", "change_resourcebase"], self.anonymous_user: ["view_resourcebase"], }, ), ] for counter, item in enumerate(use_cases): permissions, expected = item self.resource.set_permissions(permissions) for authorized_subject, expected_perms in expected.items(): perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)] self.assertSetEqual( set(expected_perms), set(perms_got), msg=f"use case #{counter} - user: {authorized_subject.username}", )
@override_settings(RESOURCE_PUBLISHING=True) @override_settings(ADMIN_MODERATE_UPLOADS=True)
[docs] def test_permissions_on_user_role_promotion_to_manager(self): """ **ADVANCED WORKFLOW** - test_permissions_on_user_role_promotion_to_manager - `RESOURCE_PUBLISHING = True` - `ADMIN_MODERATE_UPLOADS = True` """ sut = GroupMember.objects.filter(user=self.group_member).exclude(group__title="Registered Members").first() expected = { self.author: [ "download_resourcebase", "view_resourcebase", ], self.group_manager: [ "change_resourcebase", "change_resourcebase_metadata", "delete_resourcebase", "download_resourcebase", "view_resourcebase", "publish_resourcebase", "change_resourcebase_permissions", ], self.group_member: [ "change_resourcebase", "change_resourcebase_metadata", "delete_resourcebase", "download_resourcebase", "view_resourcebase", "publish_resourcebase", "change_resourcebase_permissions", ], } try: self.resource.is_approved = True self.resource.is_published = False self.resource.save() self.assertEqual(sut.role, "member") sut.promote() sut.refresh_from_db() self.assertEqual(sut.role, "manager") for authorized_subject, expected_perms in expected.items(): perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)] self.assertSetEqual( set(expected_perms), set(perms_got), msg=f"use case #0 - user: {authorized_subject.username}" ) finally: self.resource.is_approved = True self.resource.is_published = True self.resource.save() sut.demote()
@override_settings(RESOURCE_PUBLISHING=True) @override_settings(ADMIN_MODERATE_UPLOADS=True)
[docs] def test_permissions_on_user_role_demote_to_member(self): """ **ADVANCED WORKFLOW** - test_permissions_on_user_role_demote_to_member - `RESOURCE_PUBLISHING = True` - `ADMIN_MODERATE_UPLOADS = True` """ sut = GroupMember.objects.filter(user=self.group_manager).exclude(group__title="Registered Members").first() self.assertEqual(sut.role, "manager") sut.demote() sut.refresh_from_db() self.assertEqual(sut.role, "member") expected = { self.author: [ "download_resourcebase", "view_resourcebase", ], self.group_manager: ["download_resourcebase", "view_resourcebase"], self.group_member: ["download_resourcebase", "view_resourcebase"], } for authorized_subject, expected_perms in expected.items(): perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)] self.assertSetEqual( set(expected_perms), set(perms_got), msg=f"use case #0 - user: {authorized_subject.username}" )
@override_settings(RESOURCE_PUBLISHING=True)
[docs] def test_permissions_on_user_role_demote_to_member_only_RESOURCE_PUBLISHING_active(self): """ **SIMPLE PUBLISHING** - test_permissions_on_user_role_demote_to_member_only_RESOURCE_PUBLISHING_active - `RESOURCE_PUBLISHING = True` (Autopublishing is disabled) - `ADMIN_MODERATE_UPLOADS = False` """ sut = GroupMember.objects.filter(user=self.group_manager).exclude(group__title="Registered Members").first() self.assertEqual(sut.role, "manager") sut.demote() sut.refresh_from_db() self.assertEqual(sut.role, "member") expected = { self.author: [ "delete_resourcebase", "download_resourcebase", "view_resourcebase", "change_resourcebase", "change_resourcebase_metadata", "change_resourcebase_permissions", ], self.group_manager: ["download_resourcebase", "view_resourcebase"], self.group_member: ["download_resourcebase", "view_resourcebase"], } for authorized_subject, expected_perms in expected.items(): perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)] self.assertSetEqual( set(expected_perms), set(perms_got), msg=f"use case #0 - user: {authorized_subject.username}" )
@override_settings(RESOURCE_PUBLISHING=True)
[docs] def test_permissions_on_user_role_promote_to_manager_only_RESOURCE_PUBLISHING_active(self): """ **SIMPLE PUBLISHING** - test_permissions_on_user_role_promote_to_manager_only_RESOURCE_PUBLISHING_active - `RESOURCE_PUBLISHING = True` (Autopublishing is disabled) - `ADMIN_MODERATE_UPLOADS = False` """ sut = GroupMember.objects.filter(user=self.group_member).exclude(group__title="Registered Members").first() self.assertEqual(sut.role, "member") sut.promote() sut.refresh_from_db() self.assertEqual(sut.role, "manager") expected = { self.author: [ "delete_resourcebase", "download_resourcebase", "view_resourcebase", "change_resourcebase", "change_resourcebase_metadata", "change_resourcebase_permissions", ], self.group_manager: [ "change_resourcebase", "change_resourcebase_metadata", "delete_resourcebase", "download_resourcebase", "view_resourcebase", "publish_resourcebase", "change_resourcebase_permissions", ], self.group_member: [ "change_resourcebase", "change_resourcebase_metadata", "delete_resourcebase", "download_resourcebase", "view_resourcebase", "publish_resourcebase", "change_resourcebase_permissions", ], } for authorized_subject, expected_perms in expected.items(): perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)] self.assertSetEqual( set(expected_perms), set(perms_got), msg=f"use case #0 - user: {authorized_subject.username}" )
@override_settings(RESOURCE_PUBLISHING=True) @override_settings(ADMIN_MODERATE_UPLOADS=True)
[docs] class TestPermissionChanges(GeoNodeBaseTestSupport):
[docs] def setUp(self): # Creating groups self.author, _ = get_user_model().objects.get_or_create(username="author") self.group_manager, _ = get_user_model().objects.get_or_create(username="group_manager") self.resource_group_manager, _ = get_user_model().objects.get_or_create(username="resource_group_manager") self.group_member, _ = get_user_model().objects.get_or_create(username="group_member") self.member_with_perms, _ = get_user_model().objects.get_or_create(username="member_with_perms") # Defining group profiles and members self.owner_group, _ = GroupProfile.objects.get_or_create(slug="owner_group") self.resource_group, _ = GroupProfile.objects.get_or_create(slug="resource_group") # defining group members GroupMember.objects.get_or_create(group=self.owner_group, user=self.author, role="member") GroupMember.objects.get_or_create(group=self.owner_group, user=self.group_member, role="member") GroupMember.objects.get_or_create(group=self.owner_group, user=self.group_manager, role="manager") GroupMember.objects.get_or_create(group=self.resource_group, user=self.resource_group_manager, role="manager") # Creating the default resource self.resource = create_single_dataset( name="test_layer_adv", owner=self.author, is_approved=False, is_published=False, was_approved=False, was_published=False, group=self.resource_group.group, ) self.owner_perms = ["view_resourcebase", "download_resourcebase"] self.edit_perms = ["change_resourcebase", "change_resourcebase_metadata"] self.dataset_perms = ["change_dataset_style", "change_dataset_data"] self.adv_owner_limit = ["delete_resourcebase", "change_resourcebase_permissions", "publish_resourcebase"] self.safe_perms = ["download_resourcebase", "view_resourcebase"] self.data = { "resource-title": self.resource.title, "resource-owner": self.author.id, "resource-date": "2021-10-27 05:59 am", "resource-date_type": "publication", "resource-language": self.resource.language, "resource-is_approved": "on", "resource-group": self.resource_group.group.id, "dataset_attribute_set-TOTAL_FORMS": 0, "dataset_attribute_set-INITIAL_FORMS": 0, } self.url = reverse("dataset_metadata", args=(self.resource.alternate,)) # Assign manage perms to user member_with_perms for perm in self.dataset_perms: assign_perm(perm, self.member_with_perms, self.resource) for perm in self.owner_perms: assign_perm(perm, self.member_with_perms, self.resource.get_self_resource()) # Assert inital assignment of permissions to groups and users resource_perm_specs = self.resource.get_all_level_info() self.assertSetEqual( set(resource_perm_specs["users"][self.author]), set(self.owner_perms + self.edit_perms + self.dataset_perms) ) self.assertSetEqual( set(resource_perm_specs["users"][self.member_with_perms]), set(self.owner_perms + self.dataset_perms) ) self.assertSetEqual( set(resource_perm_specs["users"][self.group_manager]), set(self.owner_perms + self.edit_perms + self.dataset_perms + self.adv_owner_limit), ) self.assertSetEqual( set(resource_perm_specs["users"][self.resource_group_manager]), set(self.owner_perms + self.edit_perms + self.dataset_perms + self.adv_owner_limit), ) self.assertSetEqual(set(resource_perm_specs["groups"][self.owner_group.group]), set(self.safe_perms)) self.assertSetEqual(set(resource_perm_specs["groups"][self.resource_group.group]), set(self.safe_perms))
[docs] def test_permissions_on_approve_and_publish_changes(self): # Group manager approves a resource self.group_manager.set_password("group_manager") self.group_manager.save() self.assertTrue(self.client.login(username="group_manager", password="group_manager")) response = self.client.post(self.url, data=self.data) self.assertEqual(response.status_code, 200) self.assertions_for_approved_or_published_is_true() # Un approve resource self.data.pop("resource-is_approved") response = self.client.post(self.url, data=self.data) self.assertEqual(response.status_code, 200) self.assertions_for_approved_and_published_is_false() # Admin publishes and approves resource response = self.admin_approve_and_publish_resource() self.assertEqual(response.status_code, 200) self.assertions_for_approved_or_published_is_true() # Admin Un approves and un publishes resource response = self.admin_unapprove_and_unpublish_resource() self.assertEqual(response.status_code, 200) self.assertions_for_approved_and_published_is_false()
[docs] def test_owner_is_group_manager(self): try: GroupMember.objects.get(group=self.owner_group, user=self.author).promote() # Admin publishes and approves the resource response = self.admin_approve_and_publish_resource() self.assertEqual(response.status_code, 200) resource_perm_specs = self.resource.get_all_level_info() # Once a resource has been published, the 'publish_resourcebase' permission should be removed anyway self.assertSetEqual( set(resource_perm_specs["users"][self.author]), set(self.owner_perms + self.edit_perms + self.dataset_perms + self.adv_owner_limit), ) # Admin un-approves and un-publishes the resource response = self.admin_unapprove_and_unpublish_resource() self.assertEqual(response.status_code, 200) resource_perm_specs = self.resource.get_all_level_info() self.assertSetEqual( set(resource_perm_specs["users"][self.author]), set(self.owner_perms + self.edit_perms + self.dataset_perms + self.adv_owner_limit), ) finally: GroupMember.objects.get(group=self.owner_group, user=self.author).demote()
[docs] def assertions_for_approved_or_published_is_true(self): resource_perm_specs = self.resource.get_all_level_info() self.assertSetEqual(set(resource_perm_specs["users"][self.author]), set(self.owner_perms)) self.assertSetEqual( set(resource_perm_specs["users"][self.member_with_perms]), set(self.owner_perms + self.dataset_perms) ) self.assertSetEqual( set(resource_perm_specs["users"][self.group_manager]), set(self.owner_perms + self.edit_perms + self.dataset_perms + self.adv_owner_limit), ) self.assertSetEqual( set(resource_perm_specs["users"][self.resource_group_manager]), set(self.owner_perms + self.edit_perms + self.dataset_perms + self.adv_owner_limit), ) self.assertSetEqual(set(resource_perm_specs["groups"][self.owner_group.group]), set(self.safe_perms)) self.assertSetEqual(set(resource_perm_specs["groups"][self.resource_group.group]), set(self.safe_perms))
[docs] def assertions_for_approved_and_published_is_false(self): resource_perm_specs = self.resource.get_all_level_info() self.assertSetEqual( set(resource_perm_specs["users"][self.author]), set(self.owner_perms + self.edit_perms + self.dataset_perms) ) self.assertSetEqual( set(resource_perm_specs["users"][self.member_with_perms]), set(self.owner_perms + self.dataset_perms) ) self.assertSetEqual( set(resource_perm_specs["users"][self.group_manager]), set(self.owner_perms + self.edit_perms + self.dataset_perms + self.adv_owner_limit), ) self.assertSetEqual( set(resource_perm_specs["users"][self.resource_group_manager]), set(self.owner_perms + self.edit_perms + self.dataset_perms + self.adv_owner_limit), ) self.assertSetEqual(set(resource_perm_specs["groups"][self.owner_group.group]), set(self.safe_perms)) self.assertSetEqual(set(resource_perm_specs["groups"][self.resource_group.group]), set(self.safe_perms))
[docs] def admin_approve_and_publish_resource(self): self.assertTrue(self.client.login(username="admin", password="admin")) self.data["resource-is_approved"] = "on" self.data["resource-is_published"] = "on" response = self.client.post(self.url, data=self.data) self.resource.refresh_from_db() return response
[docs] def admin_unapprove_and_unpublish_resource(self): self.assertTrue(self.client.login(username="admin", password="admin")) self.data.pop("resource-is_approved") self.data.pop("resource-is_published") response = self.client.post(self.url, data=self.data) self.resource.refresh_from_db() return response
[docs] class TestUserHasPerms(GeoNodeBaseTestSupport): """ Ensure that the Permission classes behaves as expected """ @classmethod
[docs] def setUpClass(cls) -> None: super().setUpClass() cls.dataset = create_single_dataset(name="test_permission_dataset") cls.document = create_single_doc(name="test_permission_doc") cls.map = create_single_map(name="test_permission_map")
@classmethod
[docs] def tearDownClass(self) -> None: Dataset.objects.filter(name="test_permission_dataset").delete() Document.objects.filter(title="test_permission_doc").delete() Map.objects.filter(title="test_permission_map").delete()
[docs] def setUp(self): self.marty, _ = get_user_model().objects.get_or_create(username="marty", password="mcfly")
[docs] def test_user_with_view_perms(self): use_cases = [ {"resource": self.dataset, "url": "base-resources-detail"}, {"resource": self.dataset, "url": "datasets-detail"}, {"resource": self.document, "url": "documents-detail"}, {"resource": self.map, "url": "maps-detail"}, ] for _case in use_cases: # setting the view permissions url = reverse(_case["url"], kwargs={"pk": _case["resource"].pk}) _case["resource"].set_permissions( {"users": {self.marty.username: ["base.view_resourcebase", "base.download_resourcebase"]}} ) # calling the api self.client.force_login(self.marty) result = self.client.get(url) # checking that the user can call the url in get self.assertEqual(200, result.status_code, _case) # the user cannot patch the resource result = self.client.patch(url) # checking that the user cannot call the url in patch due the lack of permissions self.assertEqual(403, result.status_code, _case) # after update the permissions list, the user can modify the resource _case["resource"].set_permissions( {"users": {self.marty.username: ["base.view_resourcebase", "base.change_resourcebase"]}} ) # the user can patch the resource result = self.client.patch(url) # checking that the user can call the url in patch since now it has the permissions self.assertEqual(200, result.status_code, _case)
[docs] def test_user_with_view_listing(self): use_cases = [ {"resource": self.dataset, "url": "base-resources-list"}, {"resource": self.dataset, "url": "datasets-list"}, {"resource": self.document, "url": "documents-list"}, {"resource": self.map, "url": "maps-list"}, ] for _case in use_cases: # setting the view permissions url = reverse(_case["url"]) _case["resource"].set_permissions( {"users": {self.marty.username: ["base.view_resourcebase", "base.download_resourcebase"]}} ) # calling the api self.client.force_login(self.marty) result = self.client.get(url) # checking that the user can call the url in get self.assertEqual(200, result.status_code, _case) # the user cannot patch the resource result = self.client.patch(url) # checking that the user cannot call the url in patch due the lack of permissions self.assertEqual(403, result.status_code, _case)
[docs] def test_anonymous_user_is_stripped_off(self): from geonode.base.models import ResourceBase perms = ["base.view_resourcebase", "base.download_resourcebase"] resource = ResourceBase.objects.get(id=self.dataset.id) for perm in perms: assign_perm(perm, get_anonymous_user(), resource) assign_perm(perm, Group.objects.get(name="anonymous"), resource) perm_spec = resource.get_all_level_info() anonymous_user_perm = perm_spec["users"].get(get_anonymous_user()) self.assertEqual(anonymous_user_perm, None, "Anynmous user wasn't removed")